/*
 * Copyright (c) 2022-2024 KCloud-Platform-IoT Author or Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.rocketmq.client.consumer;

import lombok.Data;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

/**
 * In most scenarios, this is the mostly recommended class to consume messages.
 * Technically speaking, this push client is virtually a wrapper of the underlying pull
 * service. Specifically, on arrival of messages pulled from brokers, it roughly invokes
 * the registered callback handler to feed the messages. See quickstart/Consumer in the
 * example module for a typical usage.
 *
 * <p>
 * <strong>Thread Safety:</strong> After initialization, the instance can be regarded as
 * thread-safe.
 * </p>
 *
 * @author laokou
 */
@Data
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

	private final Logger log = LoggerFactory.getLogger(DefaultMQPushConsumer.class);

	/**
	 * Internal implementation. Most of the functions herein are delegated to it.
	 */
	protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

	/**
	 * Consumers of the same role is required to have exactly same subscriptions and
	 * consumerGroup to correctly achieve load balance. It's required and needs to be
	 * globally unique. See
	 * <a href="https://rocketmq.apache.org/docs/introduction/02concepts">here</a> for
	 * further discussion.
	 */
	private String consumerGroup;

	/**
	 * Message model defines the way how messages are delivered to each consumer clients.
	 * RocketMQ supports two message models: clustering and broadcasting. If clustering is
	 * set, consumer clients with the same {@link #consumerGroup} would only consume
	 * shards of the messages subscribed, which achieves load balances; Conversely, if the
	 * broadcasting is set, each consumer client will consume all subscribed messages
	 * separately. This field defaults to clustering.
	 */
	private MessageModel messageModel = MessageModel.CLUSTERING;

	/**
	 * Consuming point on consumer booting. There are three consuming points:
	 * <ul>
	 * <li><code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it
	 * stopped previously. If it were a newly booting up consumer client, according aging
	 * of the consumer group, there are two cases:
	 * <ol>
	 * <li>if the consumer group is created so recently that the earliest message being
	 * subscribed has yet expired, which means the consumer group represents a lately
	 * launched business, consuming will start from the very beginning;</li>
	 * <li>if the earliest message being subscribed has expired, consuming will start from
	 * the latest messages, meaning messages born prior to the booting timestamp would be
	 * ignored.</li>
	 * </ol>
	 * </li>
	 * <li><code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from
	 * earliest messages available.</li>
	 * <li><code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified
	 * timestamp, which means messages born prior to {@link #consumeTimestamp} will be
	 * ignored</li>
	 * </ul>
	 */
	private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

	/**
	 * Backtracking consumption time with second precision. Time format is
	 * 20131223171201<br>
	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
	 * Default backtracking consumption time Half an hour ago.
	 */
	private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));

	/**
	 * Queue allocation algorithm specifying how message queues are allocated to each
	 * consumer clients.
	 */
	private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

	/**
	 * Subscription relationship.
	 */
	private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<>();

	/**
	 * Message listener.
	 */
	private MessageListener messageListener;

	/**
	 * Listener to call if message queue assignment is changed.
	 */
	private MessageQueueListener messageQueueListener;

	/**
	 * Offset Storage.
	 */
	private OffsetStore offsetStore;

	/**
	 * Minimum consumer thread number.
	 */
	private int consumeThreadMin = 20;

	/**
	 * Max consumer thread number.
	 */
	private int consumeThreadMax = 20;

	/**
	 * Threshold for dynamic adjustment of the number of thread pool.
	 */
	private long adjustThreadPoolNumsThreshold = 100000;

	/**
	 * Concurrently max span offset.it has no effect on sequential consumption.
	 */
	private int consumeConcurrentlyMaxSpan = 2000;

	/**
	 * Flow control threshold on queue level, each message queue will cache at most 1000
	 * messages by default, Consider the {@code pullBatchSize}, the instantaneous value
	 * may exceed the limit.
	 */
	private int pullThresholdForQueue = 1000;

	/**
	 * Flow control threshold on queue level, means max num of messages waiting to ack. in
	 * contrast with pull threshold, once a message is popped, it's considered the
	 * beginning of consumption.
	 */
	private int popThresholdForQueue = 96;

	/**
	 * Limit the cached message size on queue level, each message queue will cache at most
	 * 100 MiB messages by default, Consider the {@code pullBatchSize}, the instantaneous
	 * value may exceed the limit.
	 *
	 * <p>
	 * The size(MB) of a message only measured by message body, so it's not accurate.
	 */
	private int pullThresholdSizeForQueue = 100;

	/**
	 * Flow control threshold on topic level, default value is -1(Unlimited)
	 * <p>
	 * The value of {@code pullThresholdForQueue} will be overwritten and calculated based
	 * on {@code pullThresholdForTopic} if it isn't unlimited
	 * <p>
	 * For example, if the value of pullThresholdForTopic is 1000 and 10 message queues
	 * are assigned to this consumer, then pullThresholdForQueue will be set to 100.
	 */
	private int pullThresholdForTopic = -1;

	/**
	 * Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
	 * <p>
	 * The value of {@code pullThresholdSizeForQueue} will be overwritten and calculated
	 * based on {@code pullThresholdSizeForTopic} if it isn't unlimited
	 * <p>
	 * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message
	 * queues are assigned to this consumer, then pullThresholdSizeForQueue will be set to
	 * 100 MiB.
	 */
	private int pullThresholdSizeForTopic = -1;

	/**
	 * Message pull Interval.
	 */
	private long pullInterval = 0;

	/**
	 * Batch consumption size.
	 */
	private int consumeMessageBatchMaxSize = 1;

	/**
	 * Batch pull size.
	 */
	private int pullBatchSize = 32;

	private int pullBatchSizeInBytes = 256 * 1024;

	/**
	 * Whether update subscription relationship when every pull.
	 */
	private boolean postSubscriptionWhenPull = false;

	/**
	 * Whether the unit of subscription group。
	 */
	private boolean unitMode = false;

	/**
	 * Max re-consume times. In concurrently mode, -1 means 16; In orderly mode, -1 means
	 * Integer.MAX_VALUE. If messages are re-consumed more than #maxReconsumeTimes before
	 * success.
	 */
	private int maxReconsumeTimes = -1;

	/**
	 * Suspending pulling time for cases requiring slow pulling like flow-control.
	 * scenario.
	 */
	private long suspendCurrentQueueTimeMillis = 1000;

	/**
	 * Maximum amount of time in minutes a message may block the consuming thread.
	 */
	private long consumeTimeout = 15;

	/**
	 * Maximum amount of invisible time in millisecond of a message, rang is [5000,
	 * 300000].
	 */
	private long popInvisibleTime = 60000;

	/**
	 * Batch pop size. range is [1, 32]
	 */
	private int popBatchNums = 32;

	/**
	 * Maximum time to await message consuming when shutdown consumer, 0 indicates no
	 * await.
	 */
	private long awaitTerminationMillisWhenShutdown = 0;

	/**
	 * Interface of asynchronous transfer data.
	 */
	private TraceDispatcher traceDispatcher = null;

	// force to use client rebalance
	private boolean clientRebalance = true;

	private RPCHook rpcHook = null;

	/**
	 * Default constructor.
	 */
	public DefaultMQPushConsumer() {
		this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
	}

	/**
	 * Constructor specifying consumer group.
	 * @param consumerGroup Consumer group.
	 */
	public DefaultMQPushConsumer(final String consumerGroup) {
		this(consumerGroup, null, new AllocateMessageQueueAveragely());
	}

	/**
	 * Constructor specifying RPC hook.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 */
	public DefaultMQPushConsumer(RPCHook rpcHook) {
		this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
	}

	/**
	 * Constructor specifying consumer group, RPC hook.
	 * @param consumerGroup Consumer group.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 */
	public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
		this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
	}

	/**
	 * Constructor specifying consumer group, enabled msg trace flag and customized trace
	 * topic name.
	 * @param consumerGroup Consumer group.
	 * @param enableMsgTrace Switch flag instance for message trace.
	 * @param customizedTraceTopic The name value of message trace topic.If you don't
	 * config,you can use the default trace topic name.
	 */
	public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace,
			final String customizedTraceTopic) {
		this(consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);
	}

	/**
	 * Constructor specifying consumer group, RPC hook and message queue allocating
	 * algorithm.
	 * @param consumerGroup Consumer group.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 * @param allocateMessageQueueStrategy Message queue allocating algorithm.
	 */
	public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
			AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
		this(consumerGroup, rpcHook, allocateMessageQueueStrategy, false, null);
	}

	/**
	 * Constructor specifying consumer group, RPC hook, message queue allocating
	 * algorithm, enabled msg trace flag and customized trace topic name.
	 * @param consumerGroup Consumer group.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 * @param allocateMessageQueueStrategy message queue allocating algorithm.
	 * @param enableMsgTrace Switch flag instance for message trace.
	 * @param customizedTraceTopic The name value of message trace topic.If you don't
	 * config,you can use the default trace topic name.
	 */
	public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
			AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
			final String customizedTraceTopic) {
		this.consumerGroup = consumerGroup;
		this.rpcHook = rpcHook;
		this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
		defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
		this.enableTrace = enableMsgTrace;
		this.traceTopic = customizedTraceTopic;
	}

	/**
	 * Constructor specifying namespace and consumer group.
	 * @param namespace Namespace for this MQ Producer instance.
	 * @param consumerGroup Consumer group.
	 */
	@Deprecated
	public DefaultMQPushConsumer(final String namespace, final String consumerGroup) {
		this(namespace, consumerGroup, null, new AllocateMessageQueueAveragely());
	}

	/**
	 * Constructor specifying namespace, consumer group and RPC hook .
	 * @param namespace Namespace for this MQ Producer instance.
	 * @param consumerGroup Consumer group.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 */
	@Deprecated
	public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook) {
		this(namespace, consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
	}

	/**
	 * Constructor specifying namespace, consumer group, RPC hook and message queue
	 * allocating algorithm.
	 * @param namespace Namespace for this MQ Producer instance.
	 * @param consumerGroup Consumer group.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 * @param allocateMessageQueueStrategy Message queue allocating algorithm.
	 */
	@Deprecated
	public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
			AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
		this.consumerGroup = consumerGroup;
		this.namespace = namespace;
		this.rpcHook = rpcHook;
		this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
		defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
	}

	/**
	 * Constructor specifying namespace, consumer group, RPC hook, message queue
	 * allocating algorithm, enabled msg trace flag and customized trace topic name.
	 * @param namespace Namespace for this MQ Producer instance.
	 * @param consumerGroup Consumer group.
	 * @param rpcHook RPC hook to execute before each remoting command.
	 * @param allocateMessageQueueStrategy message queue allocating algorithm.
	 * @param enableMsgTrace Switch flag instance for message trace.
	 * @param customizedTraceTopic The name value of message trace topic.If you don't
	 * config,you can use the default trace topic name.
	 */
	@Deprecated
	public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
			AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace,
			final String customizedTraceTopic) {
		this.consumerGroup = consumerGroup;
		this.namespace = namespace;
		this.rpcHook = rpcHook;
		this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
		defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
		this.enableTrace = enableMsgTrace;
		this.traceTopic = customizedTraceTopic;
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes)
			throws MQClientException {
		createTopic(key, withNamespace(newTopic), queueNum, 0, null);
	}

	@Override
	public void setUseTLS(boolean useTLS) {
		super.setUseTLS(useTLS);
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes)
			throws MQClientException {
		this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
		return this.defaultMQPushConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public long maxOffset(MessageQueue mq) throws MQClientException {
		return this.defaultMQPushConsumerImpl.maxOffset(queueWithNamespace(mq));
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public long minOffset(MessageQueue mq) throws MQClientException {
		return this.defaultMQPushConsumerImpl.minOffset(queueWithNamespace(mq));
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
		return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
			throws MQClientException, InterruptedException {
		return this.defaultMQPushConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	@Override
	public MessageExt viewMessage(String topic, String msgId)
			throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
		try {
			MessageDecoder.decodeMessageId(msgId);
			return this.defaultMQPushConsumerImpl.viewMessage(withNamespace(topic), msgId);
		}
		catch (Exception e) {
			// Ignore
		}
		return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(withNamespace(topic), msgId);
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
		return defaultMQPushConsumerImpl;
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	public void setSubscription(Map<String, String> subscription) {
		Map<String, String> subscriptionWithNamespace = new HashMap<>(subscription.size(), 1);
		for (Entry<String, String> topicEntry : subscription.entrySet()) {
			subscriptionWithNamespace.put(withNamespace(topicEntry.getKey()), topicEntry.getValue());
		}
		this.subscription = subscriptionWithNamespace;
	}

	/**
	 * Send message back to broker which will be re-delivered in future.
	 * <p>
	 * This method will be removed or it's visibility will be changed in a certain version
	 * after April 5, 2020, so please do not use this method.
	 * @param msg Message to send back.
	 * @param delayLevel delay level.
	 * @throws RemotingException if there is any network-tier error.
	 * @throws MQBrokerException if there is any broker error.
	 * @throws InterruptedException if the thread is interrupted.
	 * @throws MQClientException if there is any client error.
	 */
	@Deprecated
	@Override
	public void sendMessageBack(MessageExt msg, int delayLevel)
			throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
		msg.setTopic(withNamespace(msg.getTopic()));
		this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, msg.getBrokerName());
	}

	/**
	 * Send message back to the broker whose name is <code>brokerName</code> and the
	 * message will be re-delivered in future.
	 * <p>
	 * This method will be removed or it's visibility will be changed in a certain version
	 * after April 5, 2020, so please do not use this method.
	 * @param msg Message to send back.
	 * @param delayLevel delay level.
	 * @param brokerName broker name.
	 * @throws RemotingException if there is any network-tier error.
	 * @throws MQBrokerException if there is any broker error.
	 * @throws InterruptedException if the thread is interrupted.
	 * @throws MQClientException if there is any client error.
	 */
	@Deprecated
	@Override
	public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
			throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
		msg.setTopic(withNamespace(msg.getTopic()));
		this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
	}

	@Override
	public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
		return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
	}

	/**
	 * This method gets internal infrastructure readily to serve. Instances must call this
	 * method after configuration.
	 * @throws MQClientException if there is any client error.
	 */
	@Override
	public void start() throws MQClientException {
		setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
		this.defaultMQPushConsumerImpl.start();
		if (enableTrace) {
			try {
				AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
						getTraceMsgBatchNum(), traceTopic, rpcHook);
				dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
				dispatcher.setNamespaceV2(namespaceV2);
				traceDispatcher = dispatcher;
				this.defaultMQPushConsumerImpl
					.registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
			}
			catch (Throwable e) {
				log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
			}
		}
		if (null != traceDispatcher) {
			if (traceDispatcher instanceof AsyncTraceDispatcher) {
				((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
			}
			try {
				traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
			}
			catch (MQClientException e) {
				log.warn("trace dispatcher start failed ", e);
			}
		}
	}

	/**
	 * Shut down this client and releasing underlying resources.
	 */
	@Override
	public void shutdown() {
		this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
		if (null != traceDispatcher) {
			traceDispatcher.shutdown();
		}
	}

	@Override
	@Deprecated
	public void registerMessageListener(MessageListener messageListener) {
		this.messageListener = messageListener;
		this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
	}

	/**
	 * Register a callback to execute on message arrival for concurrent consuming.
	 * @param messageListener message handling callback.
	 */
	@Override
	public void registerMessageListener(MessageListenerConcurrently messageListener) {
		this.messageListener = messageListener;
		this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
	}

	/**
	 * Register a callback to execute on message arrival for orderly consuming.
	 * @param messageListener message handling callback.
	 */
	@Override
	public void registerMessageListener(MessageListenerOrderly messageListener) {
		this.messageListener = messageListener;
		this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
	}

	/**
	 * Subscribe a topic to consuming subscription.
	 * @param topic topic to subscribe.
	 * @param subExpression subscription expression.it only support or operation such as
	 * "tag1 || tag2 || tag3" <br>
	 * if null or * expression,meaning subscribe all
	 * @throws MQClientException if there is any client error.
	 */
	@Override
	public void subscribe(String topic, String subExpression) throws MQClientException {
		this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
	}

	/**
	 * Subscribe a topic to consuming subscription.
	 * @param topic topic to consume.
	 * @param fullClassName full class name,must extend org.apache.rocketmq.common.filter.
	 * MessageFilter
	 * @param filterClassSource class source code,used UTF-8 file encoding,must be
	 * responsible for your code safety
	 */
	@Override
	public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
		this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), fullClassName, filterClassSource);
	}

	/**
	 * Subscribe a topic by message selector.
	 * @param topic topic to consume.
	 * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
	 * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
	 * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
	 */
	@Override
	public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
		this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), messageSelector);
	}

	/**
	 * Un-subscribe the specified topic from subscription.
	 * @param topic message topic
	 */
	@Override
	public void unsubscribe(String topic) {
		this.defaultMQPushConsumerImpl.unsubscribe(topic);
	}

	/**
	 * Update the message consuming thread core pool size.
	 * @param corePoolSize new core pool size.
	 */
	@Override
	public void updateCorePoolSize(int corePoolSize) {
		this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
	}

	/**
	 * Suspend pulling new messages.
	 */
	@Override
	public void suspend() {
		this.defaultMQPushConsumerImpl.suspend();
	}

	/**
	 * Resume pulling.
	 */
	@Override
	public void resume() {
		this.defaultMQPushConsumerImpl.resume();
	}

	public boolean isPause() {
		return this.defaultMQPushConsumerImpl.isPause();
	}

	public boolean isConsumeOrderly() {
		return this.defaultMQPushConsumerImpl.isConsumeOrderly();
	}

	public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
		this.defaultMQPushConsumerImpl.registerConsumeMessageHook(hook);
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	public OffsetStore getOffsetStore() {
		return offsetStore;
	}

	/**
	 * This method will be removed in a certain version after April 5, 2020, so please do
	 * not use this method.
	 */
	@Deprecated
	public void setOffsetStore(OffsetStore offsetStore) {
		this.offsetStore = offsetStore;
	}

	@Override
	public boolean isUnitMode() {
		return unitMode;
	}

	@Override
	public void setUnitMode(boolean isUnitMode) {
		this.unitMode = isUnitMode;
	}

}
